# HRRP(高分辨率距离像算法) ## 算法概述 高分辨率距离像(High Resolution Range Profile, HRRP)是雷达信号处理中的核心技术,通过对雷达回波信号进行脉冲压缩和多普勒处理,获取目标在距离维上的散射点分布特征。HRRP具有以下特点: - 距离高分辨:能够区分目标上相邻较近的散射中心 - 特征稳定:为目标识别提供物理特征描述 - 计算高效:相比二维成像计算复杂度较低 - 实时性强:适用于实时目标识别系统 ## 代码来源 ## MindSpore Signal+ 实现 将原有代码重构为基于计算图的MindSpore Cell,实现雷达信号处理流程模块化。 ### 1 数据读取 在Python中使用MindSpore Signal+时,我们可以使用NumPy的fromfile接口读二进制格式文件,因此在Python代码开头需要导入NumPy库。 示例代码: ```python def read_hrrp_data(filename, maichongshu, jln, rfftn): """ 读取HRRP二进制数据文件并处理 """ with open(filename, 'rb') as fp: expected_size = maichongshu * jln * 2 data = np.fromfile(fp, dtype=np.float32) if len(data) != expected_size: raise ValueError(f"数据长度错误:期望{expected_size}个点,实际读取{len(data)}个点") data = data.reshape(maichongshu, jln, 2) datar = data[:, :, 0] datai = data[:, :, 1] # 创建填充0的完整数组 datar_full = np.zeros((maichongshu, rfftn), dtype=np.float32) datai_full = np.zeros((maichongshu, rfftn), dtype=np.float32) datar_full[:, :jln] = datar datai_full[:, :jln] = datai return datar_full, datai_full ``` 读到的数据需要转换为复数形式,以便后续处理。 ```python datar, datai = read_hrrp_data("hrrp_400_512.dat", maichongshu, jln, rfftn) input_data = np.zeros(datar.shape, dtype=np.complex64) input_data = datar + 1j * datai ``` ### 2 数据预处理 从算法整体分析,数据读取后到核心计算之前的步骤,主要是对相位补偿因子计算,Keystone变换参数预计算,泰勒窗函数预计算,循环移位索引预计算。这部分都是核心计算的前期准备,建议将这部分代码封装在`__init__`函数中完成,不放在`construct`函数中可以避免额外的开销,当实例化一个类时自动触发一次`__init__`函数。 示例代码: ```python class Hrrp(nn.Cell): def __init__(self, hangmc, liejl, f0, fs, B, mohuhalfshu, PRF, vdengxiao, mubiaojuli): super(Hrrp, self).__init__() self.hangmc = hangmc self.liejl = liejl self.f0 = f0 self.fs = fs self.B = B self.mohuhalfshu = mohuhalfshu self.PRF = PRF self.vdengxiao = vdengxiao self.mubiaojuli = mubiaojuli self.fft = mr.FFT() self.ifft = mr.IFFT() self.fftshift = mr.FFTShift() self.abs = mr.ComplexAbs() self.stoltsun = mr.Stoltsun(dim=0) self.mul = ops.Mul() self.exp = ops.Exp() self.reduce_sum = ops.ReduceSum(keep_dims=True) self.gather = ops.Gather() wl = LC / self.f0 Ka = 2.0 * self.vdengxiao**2 / (wl * self.mubiaojuli) mctime = (np.arange(self.hangmc) - self.hangmc * 0.5) / self.PRF xiangwei = np.pi * Ka * mctime**2 self.phase_comp = np.exp(1j * xiangwei).astype(np.complex64) self.phase_comp = self.phase_comp.reshape(self.hangmc, 1) self.phase_comp = ms.Tensor(self.phase_comp) fwn = self.hangmc self.fwn = fwn rnfft = Get2intm(self.liejl) self.rnfft = rnfft fwn_8 = (fwn + 7) // 8 * 8 # 8字节对齐 self.fwn_8 = fwn_8 self.xold = (np.arange(fwn_8, dtype=np.float32) - fwn / 2.0)[:, np.newaxis] self.xold = np.broadcast_to(self.xold, (fwn_8, rnfft)) i_arr = np.arange(rnfft, dtype=np.float32) sigma_arr = (f0 + (i_arr - rnfft * 0.5) * fs / rnfft) / f0 self.xnew = self.xold * sigma_arr self.xold = ms.Tensor(self.xold) self.xnew = ms.Tensor(self.xnew) self.mchtr = self.TaiLeWindow(self.hangmc, 50) window_2d = self.mchtr[:, np.newaxis] # 转换为列向量便于广播 self.window_2d = ms.Tensor(window_2d, dtype=ms.float32) self.j_constant = ms.Parameter(ms.Tensor(1.0j, dtype=ms.complex64), name="j_constant") self._precompute_cirshift_indices() def _precompute_cirshift_indices(self): rnfft = self.rnfft shift = self.rnfft // 2 indices = np.arange(rnfft, dtype=np.int32) indices_shifted = (indices - shift) % rnfft self.cirshift_indices = ms.Tensor(indices_shifted, dtype=ms.int32) def construct(self, input_data): ... ``` #### 2.1 相位补偿因子计算 ```python wl = LC / self.f0 # 计算波长 Ka = 2.0 * self.vdengxiao**2 / (wl * self.mubiaojuli) # 调频斜率 mctime = (np.arange(self.hangmc) - self.hangmc * 0.5) / self.PRF # 时间轴 xiangwei = np.pi * Ka * mctime**2 # 相位变化量 self.phase_comp = np.exp(1j * xiangwei).astype(np.complex64) # 相位补偿因子 ``` 功能说明: - 计算二次相位误差补偿因子,用于校正由于目标运动引起的相位畸变 - 将连续时间计算转换为离散时间序列,便于数字信号处理 - 相位补偿因子形状为 (hangmc, 1),便于广播到所有距离单元 #### 2.2 Keystone变换参数预计算 ```python # 关键参数计算 fwn = self.hangmc self.fwn = fwn rnfft = Get2intm(self.liejl) # 获取2的幂次方长度 self.rnfft = rnfft fwn_8 = (fwn + 7) // 8 * 8 # 8字节对齐优化 self.fwn_8 = fwn_8 # 方位向坐标轴生成 self.xold = (np.arange(fwn_8, dtype=np.float32) - fwn / 2.0)[:, np.newaxis] self.xold = np.broadcast_to(self.xold, (fwn_8, rnfft)) # 距离频率轴生成和缩放因子计算 i_arr = np.arange(rnfft, dtype=np.float32) sigma_arr = (f0 + (i_arr - rnfft * 0.5) * fs / rnfft) / f0 self.xnew = self.xold * sigma_arr # 转换为MindSpore Tensor self.xold = ms.Tensor(self.xold) self.xnew = ms.Tensor(self.xnew) ``` 参数设计原理: | 参数 | 计算公式 | 物理意义 | |-----------| ---------------------- | -------------------- | | xold | (i - fwn/2.0) | 方位向原始坐标,零中心化 | | sigma_arr | (f0 + (i - rnfft/2)*fs/rnfft)/f0 | 距离徙动校正因子 | | xnew | xold * sigma_arr | Keystone变换后的坐标 | #### 2.3 泰勒窗函数预计算 ```python self.mchtr = self.TaiLeWindow(self.hangmc, 50) window_2d = self.mchtr[:, np.newaxis] self.window_2d = ms.Tensor(window_2d, dtype=ms.float32) def TaiLeWindow(self, N, param2): # 参数处理 N = max(N, 1) nLevel = 5 nsll = 45 if param2 < 13.0 else param2 # 参数转换 mB = math.pow(10.0, nsll / 20.0) mA = math.log(mB + math.sqrt(mB * mB - 1.0)) / math.pi msquaQ = (nLevel * nLevel) / (mA * mA + (nLevel - 0.5) * (nLevel - 0.5)) # 计算窗函数系数 m_dFm = [0.0] * nLevel for m in range(nLevel - 1): mtmpa = 1.0 mtmpb = 1.0 for i in range(nLevel - 1): # 计算第一个乘积项 term = 1 - (m+1)**2 / (msquaQ * (mA**2 + (i+0.5)**2)) mtmpa *= term # 跳过相同索引 if m == i: continue # 计算第二个乘积项 term2 = 1 - (m+1)**2 / ((i+1)**2) mtmpb *= term2 # 计算系数值 m_dFm[m] = (pow(-1, m+2) * mtmpa) / (2.0 * mtmpb) # 计算窗函数值 h = np.zeros(N) for i in range(N): dwin = 0.0 for m in range(nLevel - 1): angle = 2 * math.pi * (m+1) * (i - N/2.0 + 0.5) / N dwin += m_dFm[m] * math.cos(angle) h[i] = 1.0 + 2.0 * dwin # 归一化处理 mean_val = np.mean(h) if abs(mean_val) > 1e-15: h /= mean_val return h ``` 窗函数作用: - 减少频谱泄露,提高频率分辨率 - 抑制旁瓣,提高主瓣分辨率 - 参数50表示旁瓣抑制比约为50dB #### 2.4 循环移位索引预计算 ```python def _precompute_cirshift_indices(self): rnfft = self.rnfft shift = self.rnfft // 2 # 中心频率点索引 indices = np.arange(rnfft, dtype=np.int32) indices_shifted = (indices - shift) % rnfft self.cirshift_indices = ms.Tensor(indices_shifted, dtype=ms.int32) ``` 循环移位的作用: - 频谱中心化:将零频分量移动到频谱中心 - 视觉优化:便于频谱显示和分析 - 算法要求:某些算法(如IFFT)要求输入是中心对称的 ### 3 核心计算 步骤分解: 1. 相位补偿:首先对输入数据(复数形式)进行相位补偿,乘以预计算好的相位补偿因子(用于校正二次相位误差)。 2. 多模糊数处理:计算需要搜索的模糊数总数(mohushu = 2 * mohuhalfshu + 1),然后对每一个模糊数进行以下操作: a. 计算当前模糊数对应的偏移量(offset = mh - mohuhalfshu),这个偏移量将用于Keystone变换中的相位补偿。 b. 进行Keystone变换(Keystone_interp),这是HRRP算法的核心,用于校正距离徙动。 c. 对Keystone变换后的数据,进行HRRP生成(Gethrrp),得到每一距离单元的幅度和相位信息,同时返回处理后的数据(result)和幅度数据(hrrp)。 d. 计算该HRRP的熵(Gethrrp_v2shang),熵值越小表示能量越集中,成像质量越好。 e. 将当前模糊数对应的结果(result)和熵值(shang_value)分别保存到列表results和shang中。 3. 选择最优结果:将所有模糊数对应的结果堆叠起来(使用ops.cat),然后找到熵最小的那个索引(xuhao = ops.argmin(shang)),并取出该索引对应的结果(min_result = results[xuhao])。 4. 更新输出:将最优结果的第一行(即HRRP处理后的第一行,包含了最优的相位和幅度信息)与原始输入数据的第二行到最后一行拼接起来,形成最终的输出。这里之所以只替换第一行,是因为在HRRP处理中,我们只关心每个距离单元的最强散射点,而原始数据的第一行被用来存储这个信息。 核心算法整体流程: ```python class Hrrp(nn.Cell): def __init__(self, hangmc, liejl, f0, fs, B, mohuhalfshu, PRF, vdengxiao, mubiaojuli): super(Hrrp, self).__init__() self.hangmc = hangmc self.f0 = f0 self.fs = fs self.B = B self.mohuhalfshu = mohuhalfshu self.PRF = PRF self.vdengxiao = vdengxiao self.mubiaojuli = mubiaojuli ... wl = LC / self.f0 Ka = 2.0 * self.vdengxiao**2 / (wl * self.mubiaojuli) mctime = (np.arange(self.hangmc) - self.hangmc * 0.5) / self.PRF xiangwei = np.pi * Ka * mctime**2 self.phase_comp = np.exp(1j * xiangwei).astype(np.complex64) self.phase_comp = self.phase_comp.reshape(self.hangmc, 1) self.phase_comp = ms.Tensor(self.phase_comp) ... def construct(self, input_data): out = input_data * self.phase_comp mohushu = 2 * self.mohuhalfshu + 1 results = [] shang = [] for mh in range(mohushu): offset = mh - self.mohuhalfshu tmp = self.Keystone_interp(out, self.f0, self.fs, self.B, offset) hrrp, result = self.Gethrrp(tmp, self.hangmc, self.liejl) shang_value = self.Gethrrp_v2shang(hrrp) shang.append(shang_value) result = ops.expand_dims(result, 0) results.append(result) results = ops.cat(results, axis=0) shang = ops.cat(shang) xuhao = ops.argmin(shang) min_result = results[xuhao] out = ops.cat([min_result[0:1, :], out[1:, :]], axis=0) return out ``` #### 3.1 相位补偿 ```python out = input_data * self.phase_comp ``` 物理意义: - 补偿目标:校正由于雷达与目标相对运动引起的二次相位误差 - 误差来源:目标斜距变化引起的时间延迟变化 - 数学形式:补偿因子为 exp(j·π·Ka·t²),其中Ka为调频斜率 #### 3.2 多模糊数处理循环 1. 模糊数范围确定: ```python mohushu = 2 * self.mohuhalfshu + 1 ``` 设计原理: - 由于多普勒频率模糊,真实的调频斜率有多个候选值 - 搜索范围:[-mohuhalfshu, +mohuhalfshu] - 典型设置:mohuhalfshu=5 → 搜索11个候选值 2. 循环处理每个模糊数: ```python for mh in range(mohushu): offset = mh - self.mohuhalfshu # 当前模糊数偏移 tmp = self.Keystone_interp(out, self.f0, self.fs, self.B, offset) hrrp, result = self.Gethrrp(tmp, self.hangmc, self.liejl) shang_value = self.Gethrrp_v2shang(hrrp) shang.append(shang_value) result = ops.expand_dims(result, 0) results.append(result) ``` 处理流程分解: |步骤 |功能 | 关键操作 | |--------------| ----------- | -------------------- | |Keystone变换 |距离徙动校正 | 使用当前模糊数进行插值 | |HRRP生成 |距离像提取 | 方位向FFT,提取峰值 | |熵计算 | 质量评估 | 计算HRRP的熵值 | #### 3.3 Keystone变换详解 1. 变换原理 Keystone变换是一种距离徙动校正算法,用于解决以下问题:
变换公式: 原始坐标:t (方位时间), f (距离频率) 变换后:t' = t × (f₀/(f₀+f)) 2. 代码实现 ```python # 1. 距离向FFT(转到距离频率域) # 2. 相位补偿(补偿距离徙动),方位向插值(Keystone变换核心) # 3. 距离向IFFT(转回时域) def Keystone_interp(self, input_data, f0, fs, B, moshuzhouqishu): # 获取维度 fwn = self.fwn fwn_8 = self.fwn_8 # 计算距离FFT长度 rnfft = self.rnfft # 模块1: 距离向FFT out = self.FFT4f(input_data, 1) nyquist_idx = rnfft//2 new_nyquist_values = (out[:, nyquist_idx-1] + out[:, nyquist_idx+1]) / 2.0 # 使用concat操作替换特定列 left_part = out[:, :nyquist_idx] nyquist_part = new_nyquist_values.expand_dims(1) # 增加维度 right_part = out[:, nyquist_idx+1:] # 重新拼接 out = ops.concat([left_part, nyquist_part, right_part], axis=1) # 计算无小点数量 wuxiaodiansujl = int(np.floor(0.5 * (fs - B) * rnfft / fs)) wuxiaodiansujl = (wuxiaodiansujl >> 3) << 3 # 8字节对齐 wuxiaodiansujl = max(0, wuxiaodiansujl) # 模块2: 方位向插值 pindian = (ops.arange(0, rnfft, dtype=ms.float32) - 0.5 * rnfft) * fs / rnfft j_indices = ops.arange(0, fwn, dtype=ms.float32).reshape(-1, 1) angle = -2 * PI * j_indices * pindian / f0 * moshuzhouqishu complex_angle = angle.astype(ms.complex64) j_times_angle = complex_angle * self.j_constant phase_comp_all = self.exp(j_times_angle) temp_complex_top = out * phase_comp_all temp_complex = ops.Pad(((0, fwn_8 - fwn), (0, 0)))(temp_complex_top) temp_complex_valid = temp_complex[:, wuxiaodiansujl:rnfft - wuxiaodiansujl] xnew_valid = self.xnew[:, wuxiaodiansujl:rnfft - wuxiaodiansujl] xold_valid = self.xold[:, wuxiaodiansujl:rnfft - wuxiaodiansujl] tmp = self.stoltsun(temp_complex_valid, xnew_valid, xold_valid) update_part = tmp left_part = out[:fwn, :wuxiaodiansujl] right_part = out[:fwn, rnfft - wuxiaodiansujl:] # 在列方向拼接三部分 out = ops.concat([left_part, update_part, right_part], axis=1) # 模块3: 距离向IFFT out = self.cirshift_optimized(out) out = self.IFFT4f(out, 0) return out ``` #### 3.4 HRRP生成 ```python # 1. 方位向加窗(泰勒窗) # 2. 方位向FFT(转到多普勒域) # 3. 峰值提取(每个距离单元的幅度) # 4. 返回HRRP幅度和复数值 def Gethrrp(self, input_data, hangmc, liejln): afftn = Get2intm(hangmc) # 只取有效数据部分进行加窗处理 windowed_data = input_data[:hangmc, :liejln] * self.window_2d windowed_data = windowed_data.astype(ms.complex64) windowed_data_t = ops.transpose(windowed_data, (1, 0)) padding = ((0, 0), (0, afftn - windowed_data_t.shape[1])) fft_input = ops.Pad(paddings=padding)(windowed_data_t) fft_ret = self.FFT4f(fft_input, 0) # 计算幅度平方 - 向量化操作 # fd = self.abs(fft_ret)**2 abs_result = ops.cast(ops.abs(fft_ret), ms.float32) fd = abs_result**2 # 找每列的峰值位置 - 向量化操作 maxp_indices = ops.argmax(fd, dim=1) # 提取峰值幅度 - 向量化操作 row_indices = ops.arange(liejln, dtype=ms.int32) hrrp = fd[row_indices, maxp_indices] peak_values = fft_ret[row_indices, maxp_indices] peak_values_2d = ops.expand_dims(peak_values, 0) result = ops.cat([peak_values_2d, input_data[1:, :]], axis=0) return hrrp, result ``` HRRP定义: - 高分辨率距离像 = 目标在距离维的散射点分布 - 物理意义:目标的"电磁指纹",用于目标识别 #### 3.5 熵值评估 ```python def Gethrrp_v2shang(self, x): sum_sq = self.reduce_sum(x**2) # 总能量 is_nan = sum_sq != sum_sq # 检查无效情况 is_valid = ops.logical_and(sum_sq > 1e-10, ops.logical_not(is_nan)) # 归一化并计算熵 t = x / (sum_sq) log_t = ops.log(t) entropy = -self.reduce_sum(t * log_t) # 香农熵 entropy = entropy.reshape(1) # 如果无效则返回0 result = ops.where(is_valid, entropy, ops.zeros(1, ms.float32)) result = result.reshape(1) return result ``` 熵的物理意义: |HRRP特征 |熵值表现 |物理解释| |----------|------------|--------| |能量集中 |低熵值 |少数强散射点,成像清晰| |能量分散 |高熵值 |多个弱散射点,成像模糊| |噪声影响 |高熵值 |背景噪声使能量分散| 熵值选择准则:熵值越小 → 能量越集中 → 成像质量越好 → 模糊数越准确 #### 3.6 最优结果选择 ```python # 合并所有结果 results = ops.cat(results, axis=0) # [mohushu, hangmc, liejl] shang = ops.cat(shang) # [mohushu] # 找到最小熵对应的索引 xuhao = ops.argmin(shang) # 最优模糊数索引 # 选择最优结果 min_result = results[xuhao] # 最优HRRP结果 out = ops.cat([min_result[0:1, :], out[1:, :]], axis=0) # 更新最终结果 ``` ### 4 数据后处理 这里将结果导出成二进制数据的dat文件,用于和正确结果比较。确认结果无误后,可通过 ``mindspore.export`` 导出 MINDIR 模型,便于在 MindSpore Lite 端部署(板卡侧运行)。 ## 板卡部署 模型部署建议使用 ``YHFT-IDE``,它集成了模型转换、模型可视化与 MindSpore Lite 端部署模板。具体使用方法可参考 {ref}`HelloDSP MindSpore Lite端 `。 以下是完整的MindSpore Signal+实现的代码: ```python import mindspore as ms import numpy as np from mindspore import nn, ops import math import mindradar as mr def read_hrrp_data(filename, maichongshu, jln, rfftn): # 打开二进制文件并读取所有数据 with open(filename, 'rb') as fp: # 计算预期数据量并读取 expected_size = maichongshu * jln * 2 # 每个复数点包含实部和虚部 data = np.fromfile(fp, dtype=np.float32) # 验证数据完整性 if len(data) != expected_size: raise ValueError(f"数据长度错误:期望{expected_size}个点,实际读取{len(data)}个点") # 重塑数据为3D数组 [脉冲索引, 数据点, 实部/虚部] data = data.reshape(maichongshu, jln, 2) # 分离实部和虚部 datar = data[:, :, 0] # 实部 [maichongshu, jln] datai = data[:, :, 1] # 虚部 [maichongshu, jln] # 创建填充0的完整数组 datar_full = np.zeros((maichongshu, rfftn), dtype=np.float32) datai_full = np.zeros((maichongshu, rfftn), dtype=np.float32) # 复制有效数据部分 datar_full[:, :jln] = datar datai_full[:, :jln] = datai return datar_full, datai_full def Get2intm(intnumb): # 验证输入有效性 if not isinstance(intnumb, int) or intnumb <= 0: raise ValueError("Input must be a positive integer") # 特殊处理1的情况(log2(1)=0) if intnumb == 1: return 1 # 使用更精确的log2计算 log2_val = math.log2(intnumb) # 向上取整获取指数 nn = math.ceil(log2_val) # 计算2的nn次方 result = 2 ** nn return int(result) # 转换为整数类型 def write_hrrp_binary_v2(filename, datar, datai, maichongshu, jln): # 参数强验证 if datar.shape != (maichongshu, 512) or datai.shape != (maichongshu, 512): raise ValueError(f"数组维度必须为 ({maichongshu}, 512)") # 创建交错存储矩阵 interleaved = np.empty((maichongshu, jln*2), dtype=np.float32) # 精确截取前2560列 interleaved[:, ::2] = datar[:, :jln] # 实部 interleaved[:, 1::2] = datai[:, :jln] # 虚部 # 原子写入操作 interleaved.tofile(filename) class Hrrp(nn.Cell): def __init__(self, hangmc, liejl, f0, fs, B, mohuhalfshu, PRF, vdengxiao, mubiaojuli): super(Hrrp, self).__init__() self.hangmc = hangmc self.liejl = liejl self.f0 = f0 self.fs = fs self.B = B self.mohuhalfshu = mohuhalfshu self.PRF = PRF self.vdengxiao = vdengxiao self.mubiaojuli = mubiaojuli self.fft = mr.FFT() self.ifft = mr.IFFT() self.fftshift = mr.FFTShift() self.stoltsun = mr.Stoltsun(dim=0) self.mul = ops.Mul() self.exp = ops.Exp() self.reduce_sum = ops.ReduceSum(keep_dims=True) self.gather = ops.Gather() wl = LC / self.f0 Ka = 2.0 * self.vdengxiao**2 / (wl * self.mubiaojuli) mctime = (np.arange(self.hangmc) - self.hangmc * 0.5) / self.PRF xiangwei = np.pi * Ka * mctime**2 self.phase_comp = np.exp(1j * xiangwei).astype(np.complex64) self.phase_comp = self.phase_comp.reshape(self.hangmc, 1) self.phase_comp = ms.Tensor(self.phase_comp) fwn = self.hangmc self.fwn = fwn rnfft = Get2intm(self.liejl) self.rnfft = rnfft fwn_8 = (fwn + 7) // 8 * 8 # 8字节对齐 self.fwn_8 = fwn_8 self.xold = (np.arange(fwn_8, dtype=np.float32) - fwn / 2.0)[:, np.newaxis] self.xold = np.broadcast_to(self.xold, (fwn_8, rnfft)) i_arr = np.arange(rnfft, dtype=np.float32) sigma_arr = (f0 + (i_arr - rnfft * 0.5) * fs / rnfft) / f0 self.xnew = self.xold * sigma_arr self.xold = ms.Tensor(self.xold) self.xnew = ms.Tensor(self.xnew) self.mchtr = self.TaiLeWindow(self.hangmc, 50) window_2d = self.mchtr[:, np.newaxis] # 转换为列向量便于广播 self.window_2d = ms.Tensor(window_2d, dtype=ms.float32) self.j_constant = ms.Parameter(ms.Tensor(1.0j, dtype=ms.complex64), name="j_constant") self._precompute_cirshift_indices() def _precompute_cirshift_indices(self): rnfft = self.rnfft shift = self.rnfft // 2 # 创建索引数组 [0, 1, 2, ..., rnfft-1] indices = np.arange(rnfft, dtype=np.int32) # 计算移位后的索引 # 对于左移shift:new_index = (old_index - shift) mod rnfft indices_shifted = (indices - shift) % rnfft # 转换为Tensor self.cirshift_indices = ms.Tensor(indices_shifted, dtype=ms.int32) def construct(self, input_data): out = input_data * self.phase_comp mohushu = 2 * self.mohuhalfshu + 1 results = [] shang = [] for mh in range(mohushu): offset = mh - self.mohuhalfshu tmp = self.Keystone_interp(out, self.f0, self.fs, self.B, offset) hrrp, result = self.Gethrrp(tmp, self.hangmc, self.liejl) shang_value = self.Gethrrp_v2shang(hrrp) shang.append(shang_value) result = ops.expand_dims(result, 0) results.append(result) results = ops.cat(results, axis=0) shang = ops.cat(shang) xuhao = ops.argmin(shang) min_result = results[xuhao] # 将min_result的第一行和out的第二行到最后一行连接起来 out = ops.cat([min_result[0:1, :], out[1:, :]], axis=0) return out def TaiLeWindow(self, N, param2): """ 泰勒窗函数实现 参数: N: 窗口长度 param2: 加权分贝值 (20~50) 返回: 归一化的窗函数数组 """ # 参数处理 N = max(N, 1) nLevel = 5 nsll = 45 if param2 < 13.0 else param2 # 参数转换 mB = math.pow(10.0, nsll / 20.0) mA = math.log(mB + math.sqrt(mB * mB - 1.0)) / math.pi msquaQ = (nLevel * nLevel) / (mA * mA + (nLevel - 0.5) * (nLevel - 0.5)) # 计算窗函数系数 m_dFm = [0.0] * nLevel for m in range(nLevel - 1): mtmpa = 1.0 mtmpb = 1.0 for i in range(nLevel - 1): # 计算第一个乘积项 term = 1 - (m+1)**2 / (msquaQ * (mA**2 + (i+0.5)**2)) mtmpa *= term # 跳过相同索引 if m == i: continue # 计算第二个乘积项 term2 = 1 - (m+1)**2 / ((i+1)**2) mtmpb *= term2 # 计算系数值 m_dFm[m] = (pow(-1, m+2) * mtmpa) / (2.0 * mtmpb) # 计算窗函数值 h = np.zeros(N) for i in range(N): dwin = 0.0 for m in range(nLevel - 1): angle = 2 * math.pi * (m+1) * (i - N/2.0 + 0.5) / N dwin += m_dFm[m] * math.cos(angle) h[i] = 1.0 + 2.0 * dwin # 归一化处理 mean_val = np.mean(h) if abs(mean_val) > 1e-15: h /= mean_val return h def Gethrrp_v2shang(self, x): sum_sq = self.reduce_sum(x**2) is_nan = sum_sq != sum_sq # 检查无效情况 is_valid = ops.logical_and(sum_sq > 1e-10, ops.logical_not(is_nan)) # 归一化并计算熵 t = x / (sum_sq) log_t = ops.log(t) entropy = -self.reduce_sum(t * log_t) entropy = entropy.reshape(1) # 如果无效则返回0 result = ops.where(is_valid, entropy, ops.zeros(1, ms.float32)) result = result.reshape(1) return result def Gethrrp(self, input_data, hangmc, liejln): afftn = Get2intm(hangmc) # 只取有效数据部分进行加窗处理 windowed_data = input_data[:hangmc, :liejln] * self.window_2d windowed_data = windowed_data.astype(ms.complex64) windowed_data_t = ops.transpose(windowed_data, (1, 0)) padding = ((0, 0), (0, afftn - windowed_data_t.shape[1])) fft_input = ops.Pad(paddings=padding)(windowed_data_t) fft_ret = self.FFT4f(fft_input, 0) # 计算幅度平方 - 向量化操作 abs_result = ops.cast(ops.abs(fft_ret), ms.float32) fd = abs_result**2 # 找每列的峰值位置 - 向量化操作 maxp_indices = ops.argmax(fd, dim=1) # 提取峰值幅度 - 向量化操作 row_indices = ops.arange(liejln, dtype=ms.int32) hrrp = fd[row_indices, maxp_indices] peak_values = fft_ret[row_indices, maxp_indices] peak_values_2d = ops.expand_dims(peak_values, 0) result = ops.cat([peak_values_2d, input_data[1:, :]], axis=0) return hrrp, result def Keystone_interp(self, input_data, f0, fs, B, moshuzhouqishu): # 获取维度 fwn = self.fwn fwn_8 = self.fwn_8 # 计算距离FFT长度 rnfft = self.rnfft # 模块1: 距离向FFT out = self.FFT4f(input_data, 1) nyquist_idx = rnfft//2 new_nyquist_values = (out[:, nyquist_idx-1] + out[:, nyquist_idx+1]) / 2.0 # 使用concat操作替换特定列 left_part = out[:, :nyquist_idx] nyquist_part = new_nyquist_values.expand_dims(1) # 增加维度 right_part = out[:, nyquist_idx+1:] # 重新拼接 out = ops.concat([left_part, nyquist_part, right_part], axis=1) # 计算无小点数量 wuxiaodiansujl = int(np.floor(0.5 * (fs - B) * rnfft / fs)) wuxiaodiansujl = (wuxiaodiansujl >> 3) << 3 # 8字节对齐 wuxiaodiansujl = max(0, wuxiaodiansujl) # 模块2: 方位向插值 pindian = (ops.arange(0, rnfft, dtype=ms.float32) - 0.5 * rnfft) * fs / rnfft j_indices = ops.arange(0, fwn, dtype=ms.float32).reshape(-1, 1) angle = -2 * PI * j_indices * pindian / f0 * moshuzhouqishu complex_angle = angle.astype(ms.complex64) j_times_angle = complex_angle * self.j_constant phase_comp_all = self.exp(j_times_angle) temp_complex_top = out * phase_comp_all temp_complex = ops.Pad(((0, fwn_8 - fwn), (0, 0)))(temp_complex_top) temp_complex_valid = temp_complex[:, wuxiaodiansujl:rnfft - wuxiaodiansujl] xnew_valid = self.xnew[:, wuxiaodiansujl:rnfft - wuxiaodiansujl] xold_valid = self.xold[:, wuxiaodiansujl:rnfft - wuxiaodiansujl] tmp = self.stoltsun(temp_complex_valid, xnew_valid, xold_valid) update_part = tmp left_part = out[:fwn, :wuxiaodiansujl] right_part = out[:fwn, rnfft - wuxiaodiansujl:] # 在列方向拼接三部分 out = ops.concat([left_part, update_part, right_part], axis=1) # 模块3: 距离向IFFT # out = self.cirshift(out, rnfft, rnfft // 2) out = self.cirshift_optimized(out) out = self.IFFT4f(out, 0) return out def cirshift_optimized(self, input_data): """优化后的循环移位""" # 使用预计算的索引进行gather操作 return self.gather(input_data, self.cirshift_indices, 1) def cirshift(self, input_data, n, shift): shift = shift % n if shift == 0: return input_data if input_data.ndim == 1: # 一维情况 out = ops.concat([input_data[-shift:], input_data[:-shift]]) else: # 二维情况,沿axis=1循环移位 out = ops.concat([input_data[:, -shift:], input_data[:, :-shift]], axis=1) return out def FFT4f(self, input_data, is_inverse): fft_ret = self.fft(input_data) if is_inverse: fft_ret = self.fftshift(fft_ret) end_time = time.time() * 1000 return fft_ret def IFFT4f(self, input_data, is_inverse): ifft_ret = self.ifft(input_data) if is_inverse: ifft_ret = self.fftshift(ifft_ret) return ifft_ret PI = 3.1415926535897 LC = 299.792458 maichongshu = 400 jln = 512 f0 = 9500 fs = 3600.0 B = 3000 PRF = 2000 vdengxiao = 0 mubiaojuli = 700e3 mohuhalfshu = 1 mohuhalfshu = (int)((15 * 2 / (LC / f0) / PRF) * 1.5) + 1 mohuhalfshu = 5 rfftn = Get2intm(jln) afftn = Get2intm(maichongshu) datar, datai = read_hrrp_data("hrrp_400_512.dat", maichongshu, jln, rfftn) input_data = np.zeros(datar.shape, dtype=np.complex64) input_data = datar + 1j * datai input_tensor = ms.Tensor(input_data) print(input_data.shape) print(input_tensor) model = Hrrp(maichongshu, jln, f0, fs, B, mohuhalfshu, PRF, vdengxiao, mubiaojuli) out = model(input_tensor) out = out.asnumpy() dr_out = out.real di_out = out.imag write_hrrp_binary_v2("hrrp_out_400_512_py.dat", dr_out, di_out, maichongshu, jln) print(out.shape) print(out) ms.export(model, input_tensor, file_name="hrrp_400_512", file_format='MINDIR') ```